package io.mycat.net;

import io.mycat.util.TimeUtil;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.atomic.AtomicBoolean;

public class AIOSocketWR extends SocketWR {
    private static final AIOReadHandler aioReadHandler = new AIOReadHandler();
    private static final AIOWriteHandler aioWriteHandler = new AIOWriteHandler();
    private final AsynchronousSocketChannel channel;
    protected final AbstractConnection con;
    protected final AtomicBoolean writing = new AtomicBoolean(false);

    public AIOSocketWR(AbstractConnection conn) {
        channel = (AsynchronousSocketChannel) conn.getChannel();
        this.con = conn;
    }

    @Override
    public void asynRead() {
        ByteBuffer theBuffer = con.readBuffer;
        if (theBuffer == null) {
            theBuffer = con.processor.getBufferPool().allocate(con.processor.getBufferPool().getChunkSize());
            con.readBuffer = theBuffer;
            channel.read(theBuffer, this, aioReadHandler);

        } else if (theBuffer.hasRemaining()) {
            channel.read(theBuffer, this, aioReadHandler);
        } else {
            throw new java.lang.IllegalArgumentException("full buffer to read ");
        }

    }

    private void asynWrite(final ByteBuffer buffer) {

        buffer.flip();
        this.channel.write(buffer, this, aioWriteHandler);

    }

    /**
     * return true ,means no more data
     *
     * @return
     */
    private boolean write0() {
        if (!writing.compareAndSet(false, true)) {
            return false;
        }
        ByteBuffer theBuffer = con.writeBuffer;
        if (theBuffer == null || !theBuffer.hasRemaining()) {// writeFinished,但要区分bufer是否NULL，不NULL，要回收
            if (theBuffer != null) {
                con.recycle(theBuffer);
                con.writeBuffer = null;

            }
            // poll again
            ByteBuffer buffer = con.writeQueue.poll();
            // more data
            if (buffer != null) {
                if (buffer.limit() == 0) {
                    con.recycle(buffer);
                    con.writeBuffer = null;
                    con.close("quit cmd");
                    writing.set(false);
                    return true;
                } else {
                    con.writeBuffer = buffer;
                    asynWrite(buffer);
                    return false;
                }
            } else {
                // no buffer
                writing.set(false);
                return true;
            }
        } else {
            theBuffer.compact();
            asynWrite(theBuffer);
            return false;
        }

    }

    protected void onWriteFinished(int result) {

        con.netOutBytes += result;
        con.processor.addNetOutBytes(result);
        con.lastWriteTime = TimeUtil.currentTimeMillis();
        boolean noMoreData = this.write0();
        if (noMoreData) {
            this.doNextWriteCheck();
        }

    }

    @Override
    public void doNextWriteCheck() {

        boolean noMoreData = false;
        noMoreData = this.write0();
        if (noMoreData
                && !con.writeQueue.isEmpty()) {
            this.write0();
        }

    }

    @Override
    public boolean checkAlive() {
        return channel.isOpen();
    }

    @Override
    public void disableRead() {

    }

    @Override
    public void enableRead() {

    }
}

class AIOWriteHandler implements CompletionHandler<Integer, AIOSocketWR> {

    @Override
    public void completed(final Integer result, final AIOSocketWR wr) {
        try {

            wr.writing.set(false);

            if (result >= 0) {
                wr.onWriteFinished(result);
            } else {
                wr.con.close("write erro " + result);
            }
        } catch (Exception e) {
            AbstractConnection.LOGGER.warn("caught aio process err:", e);
        }

    }

    @Override
    public void failed(Throwable exc, AIOSocketWR wr) {
        wr.writing.set(false);
        wr.con.close("write failed " + exc);
    }

}

class AIOReadHandler implements CompletionHandler<Integer, AIOSocketWR> {
    @Override
    public void completed(final Integer i, final AIOSocketWR wr) {
        // con.getProcessor().getExecutor().execute(new Runnable() {
        // public void run() {
        if (i > 0) {
            try {
                wr.con.onReadData(i);
                wr.con.asynRead();
            } catch (IOException e) {
                wr.con.close("handle err:" + e);
            }
        } else if (i == -1) {
            // System.out.println("read -1 xxxxxxxxx "+con);
            wr.con.close("client closed");
        }
        // }
        // });
    }

    @Override
    public void failed(Throwable exc, AIOSocketWR wr) {
        wr.con.close(exc.toString());

    }
}
